New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-10435] Add ValidatesRunner task for local_job_service and Java SDK harness #11792
Conversation
1cdd67b
to
46252f5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got this to run and hit some errors, but debugging is pretty awful. I never did get the local_job_service
to actually log everything to Gradle that it does to the console. To debug this, it is easiest to run a separate job service and point the build at it with -PlocalJobServicePortFile
.
I got two different errors at different times, one of which looks like a Python URN being sent to the Java SDK harness and the other like a Java URN being resolved by the Python runner (?)
RuntimeError: java.lang.IllegalStateException: No factory registered for beam:transform:read_from_impulse_python:v1, known factories [beam:transform:window_into:v1, beam:runner:source:v1, beam:runner:sink:v1, beam:source:java:0.1, beam:transform:read:v1, beam:transform:combine_per_key_precombine:v1, beam:transform:combine_per_key_merge_accumulators:v1, beam:transform:combine_per_key_extract_outputs:v1, beam:transform:combine_per_key_convert_to_accumulators:v1, beam:transform:combine_grouped_values:v1, beam:transform:flatten:v1, beam:transform:pardo:v1, beam:transform:sdf_pair_with_restriction:v1, beam:transform:sdf_split_restriction:v1, beam:transform:sdf_split_and_size_restrictions:v1, beam:transform:sdf_process_elements:v1, beam:transform:sdf_process_sized_element_and_restrictions:v1, beam:transform:map_windows:v1, beam:transform:merge_windows:v1]
at org.apache.beam.fn.harness.control.ProcessBundleHandler$UnknownPTransformRunnerFactory.createRunnerForPTransform(ProcessBundleHandler.java:792)
at org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:236)
at org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:198)
at org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:491)
at org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:275)
at org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:552)
at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:270)
at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
at org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 754, in run
self.__target(*self.__args, **self.__kwargs)
File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/local_job_service.py", line 280, in _run_job
self._pipeline_proto)
File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 331, in run_stages
bundle_context_manager,
File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 508, in _run_stage
bundle_manager)
File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 546, in _run_bundle
data_input, data_output, input_timers, expected_timer_output)
File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 930, in process_bundle
timer_inputs)):
File "/Users/klk/.virtualenvs/ulr-vr/lib/python2.7/site-packages/concurrent/futures/_base.py", line 641, in result_iterator
yield fs.pop().result()
File "/Users/klk/.virtualenvs/ulr-vr/lib/python2.7/site-packages/concurrent/futures/_base.py", line 462, in result
return self.__get_result()
File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/utils/thread_pool_executor.py", line 44, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 926, in execute
dry_run)
File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 859, in process_bundle
output.transform_id).append(output.data)
File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", line 617, in get_buffer
pcollections[output_pcoll].windowing_strategy_id])
File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/pipeline_context.py", line 137, in __getitem__
return self.get_by_id(id)
File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/pipeline_context.py", line 103, in get_by_id
self._id_to_proto[id], self._pipeline_context)
File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/transforms/core.py", line 2396, in from_runner_api
windowfn=WindowFn.from_runner_api(proto.window_fn, context),
File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/utils/urns.py", line 186, in from_runner_api
parameter_type, constructor = cls._known_urns[fn_proto.urn]
KeyError: u'beam:window_fn:serialized_java:v1'
Any idea where I should start debugging for an inversion of Java vs Python?
@@ -39,7 +39,7 @@ | |||
public class ArtifactRetrievalService | |||
extends ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceImplBase implements FnService { | |||
|
|||
public static final int DEFAULT_BUFFER_SIZE = 4 << 20; // 4 MB | |||
public static final int DEFAULT_BUFFER_SIZE = 2 << 20; // 2 MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -1,3 +1,13 @@ | |||
import groovy.json.JsonOutput | |||
|
|||
import java.nio.file.FileSystems |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO(me): remove these imports
I first went through the "normal" route of using all this stuff to watch for the pid file but it was verbose and had race conditions. No point. Just check and sleep, now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't have to watch, when the --background
flag is set it waits for the service to be up before terminating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, I'll remove that. I think in an early draft I made the gradle task not wait for the process to terminate (because the daemonized process was causing the hang, but that turned out to be a different configuration)
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' | ||
} | ||
filter { | ||
includeTestsMatching 'ImpulseTest' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO(me): remove this once we get past the sanity checking phase
* @param options Properties which configure the runner. | ||
* @return The newly created runner. | ||
*/ | ||
public static TestUniversalRunner fromOptions(PipelineOptions options) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to add this, because TestPortableRunner
couples "check that the job succeeds" logic with a bunch of other things having to do with launching an existing Java runner as a portable runner, not relevant to actual portable runner services.
proc.waitFor(); | ||
} | ||
|
||
task virtualenv { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried registering outputs.dir virtualenvDir
and inputs.dir pythonSdkDir
but it did not result in incremental build.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the existing virtualenv tasks we have?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't use them because I was attempting to do more proper idempotent/cached gradle tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hey now I recall - applyPythonNature
causes conflicting configurations. Really would like to move away from these nature things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't want Beam to move to a build setup where each gradle file does its own thing because the fragmentation will hurt debugging build issues and slow down rolling out build changes that impact more then one project.
One example where we decided to split a common setup was between releasing java projects and releasing vendored projects which lead to fixes that weren't done in both places leading to bugs that lasted for months.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not suggesting moving each build.gradle
to do their own thing. Beam Java modules and vendored libraries can share as much code as they like. They shouldn't share an entrypoint because they are different things.
In this case, the code suggested to be shared is coupled with creating a Beam Python module, which this is not. "Do Python things" is not an adequate or meaningful abstraction. Applying the vague blanket logic is a liability, even if it worked here, which it does not. It is likely that I can do some tweaks to applyPythonNature
to "make it work", but that would be bad engineering.
- Adding near-duplicate code when maybe there is an abstraction: tech debt
- Adding a dependency on something that isn't ready/meant for it: tech debt
I interpret Robert's comment as an invitation to improve our build code into some kind of meaningful abstraction that can be shared without incurring yet more tech debt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I will explore this invitation once the tests are running properly)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a dependency on something that is ready and is meant for it, but is not polished and named to make clear that it is a logical necessity: tech debt)
@@ -1,3 +1,13 @@ | |||
import groovy.json.JsonOutput | |||
|
|||
import java.nio.file.FileSystems |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't have to watch, when the --background
flag is set it waits for the service to be up before terminating.
proc.waitFor(); | ||
} | ||
|
||
task virtualenv { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the existing virtualenv tasks we have?
File pidFile = new File(localJobServicePidFile) | ||
int totalSleep = 0 | ||
while (!pidFile.exists()) { | ||
sleep(500) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't have to wait, once the above exec completes it should be there (or not). Does the above task error if the return code is non-zero?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. This code was left over from when I was struggling to get gradle to allow the thing to daemonize itself.
subprocess.Popen([ | ||
sys.executable, | ||
'-m', | ||
'apache_beam.runners.portability.local_job_service_main' | ||
] + argv) | ||
] + argv, | ||
stderr=stderr_dest, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what happens here when this process exits (and possibly tries to close these files?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't read the subprocess
code, and the docs are vague. The special subprocess.STDOUT
token indicates that the output should be "captured" into the same file handle. Comments at https://stackoverflow.com/questions/31980411/closing-files-from-subprocess-stdout imply that closing the file is the responsibility of this process. I did not run the experiments suggested there. I also did not try to refactor this code to allow a with
statement.
Once I get the suite running I can do some refactors to try to share with |
46252f5
to
ca74559
Compare
ca74559
to
56cec5f
Compare
Found a lot of exclusions before I started getting my disks filled by the build & local runner. @robertwb you may be interested in the failures where it seems empty side inputs don't work |
c466855
to
f156314
Compare
PTAL. I have a final set of exclusions, categorized, with all other tests passing locally. The local run took about an hour. I have adjusted |
I have curated the commits into orthogonal changes, if you wish to focus your review. |
f156314
to
cbb175a
Compare
e7254e2
to
da4acef
Compare
https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/13653/ appears to be a failure in unrelated code. It does not look like something that would flake. Perhaps |
Run Python PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
"--stdout_file=${localJobServiceStdoutFile}", | ||
"--pid_file=${localJobServicePidFile}", | ||
"--port_file=${localJobServicePortFile}" | ||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These lines can be removed now, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
task ulrValidatesRunnerTests(type: Test) { | ||
dependsOn ":sdks:java:container:docker" | ||
|
||
if (!project.hasProperty("localJobServicePortFile")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very convenient.
"--runner=TestUniversalRunner", | ||
"--experiments=beam_fn_api", | ||
"--localJobServicePortFile=${localJobServicePortFile}" | ||
]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider passing --defaultEnvironmentType=LOOPBACK
. You can then remove the docker dependency as well. (Maybe we could run one test with docker, but all of them seems overkill and expensive.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rather specifically want the docker dependency, to have a test of the true Java SDK harness container without the complexity of a production runner. But that can be postcommit and if LOOPBACK is faster and easier to debug that's good for precommit. I'd like to leave as-is to avoid churning this PR, but will follow up and create a LOOPBACK version prior to creating any Jenkins job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. I don't think it makes sense to run every test to verify the container/harness setup works (this is reminiscent of other threads) but definitely agree these choices can be postponed while we get this PR in.
excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo' | ||
} | ||
filter { | ||
// There is not currently a category for excluding these _only_ in committed mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you create JIRAs for these (at whatever granularity seems appropriate) and add them here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Also added a new Jira component runner-universal
since I did not find one, in case there's a need to search for these.
result.waitUntilFinish(), | ||
Matchers.is(PipelineResult.State.DONE)); | ||
return result; | ||
} catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I'd make this more local (it's thrown only at Files.readAllBytes
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* A file containing the job service port, since Gradle needs to know this filename statically | ||
* to provide it in Beam testing options. | ||
*/ | ||
@Description("File containing local job service port.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logically, it would make sense to let this be optional (e.g. one could instead provide jobEndpoint directly). A point could be made that the testing infrastructure should be the one reading the file and setting jobEndpoint, rather than passing the file path as an option (but I don't know how much messier that'd make things).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, this sort of thing is what took 90% of the time for this PR actually. Scraping around Gradle's docs and the internet for ways to insert that little bit of logic, because realizing it was sort of against the grain. Pipeline options are passed as a Java system property, and those are set up in the Gradle graph construction phase. More generally, there's not a Gradle graph execution-time slot for free-form code that also re-uses the Test task type. Perhaps they expect you to use inheritance and make a new Task type. Which I would rather not do ;_;
It would be fine to have two pipeline options, so that simple use could be simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have the plain jobEndpoint
option, just make (re)setting it conditional on LocalJobServicePortFile
being set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
if not options.stdout_file: | ||
raise RuntimeError('--stdout_file must be specified with --background') | ||
stdout_dest = open(options.stdout_file, mode='w') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I mentioned this before, but is it an issue that these file descriptors might get closed on completion of this process?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I think my rebasing clobbered that thread. It is not an issue. Parent file descriptors are not closed. You can find some links I think on the PR front page it will still have the prior conversation.
(I won't rebase from here on out, until review is done)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem. My comment was from something like a mont ago. Thanks for the references.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. This is great.
* A file containing the job service port, since Gradle needs to know this filename statically | ||
* to provide it in Beam testing options. | ||
*/ | ||
@Description("File containing local job service port.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have the plain jobEndpoint
option, just make (re)setting it conditional on LocalJobServicePortFile
being set.
|
||
if not options.stdout_file: | ||
raise RuntimeError('--stdout_file must be specified with --background') | ||
stdout_dest = open(options.stdout_file, mode='w') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem. My comment was from something like a mont ago. Thanks for the references.
Flakes in this run:
And then I just introduced an actual compile error in my fixups. |
b488bab
to
fec6672
Compare
On the portable precommit, 4 gradle scans succeeded and then the Gradle daemon crashed. |
Run Portable_Python PreCommit |
(it could have been interrupted, I didn't dig too deep) |
Flakes this time in Java precommit:
|
Run Java PreCommit |
OK this is known flake https://issues.apache.org/jira/browse/BEAM-10470 in |
This adds a ValidatesRunner suite for the Java SDK against the local Python ULR.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.